// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module infinity_core:compaction_process.impl;

import :compaction_process;
import :bg_task;
import :logger;
import :infinity_exception;
import :blocking_queue;
import :infinity_context;
import :base_memindex;
import :status;
import :wal_manager;
import :txn_state;
import :new_txn_manager;
import :new_txn;
import :new_compaction_alg;
import :table_meta;
import :db_meta;
import :segment_meta;
import :bg_task;
import :base_txn_store;

import std;
import third_party;

import global_resource_usage;
import compilation_config;

namespace infinity {

CompactionProcessor::CompactionProcessor() {
#ifdef INFINITY_DEBUG
    GlobalResourceUsage::IncrObjectCount("CompactionProcessor");
#endif
}

CompactionProcessor::~CompactionProcessor() {
#ifdef INFINITY_DEBUG
    GlobalResourceUsage::DecrObjectCount("CompactionProcessor");
#endif
}

void CompactionProcessor::Start() {
    LOG_INFO("Compaction processor is started.");
    processor_thread_ = std::thread([this] { Process(); });
}

void CompactionProcessor::Stop() {
    LOG_INFO("Compaction processor is stopping.");
    std::shared_ptr<StopProcessorTask> stop_task = std::make_shared<StopProcessorTask>();
    this->Submit(stop_task);
    stop_task->Wait();
    processor_thread_.join();
    // Ensure all pending tasks are processed before shutdown
    while (!task_queue_.Empty()) {
        std::deque<std::shared_ptr<BGTask>> tasks;
        task_queue_.DequeueBulk(tasks);
        for (const auto &bg_task : tasks) {
            if (bg_task->type_ != BGTaskType::kStopProcessor) {
                // Skip processing any remaining tasks during shutdown
                continue;
            }
        }
    }
    LOG_INFO("Compaction processor is stopped.");
}

void CompactionProcessor::Submit(const std::shared_ptr<BGTask> &bg_task) {
    task_queue_.Enqueue(bg_task);
    ++task_count_;
}

void CompactionProcessor::NewNotifyCompact() {
    LOG_TRACE("Background task triggered compaction");
    auto *new_txn_mgr = InfinityContext::instance().storage()->new_txn_manager();
    std::vector<std::pair<std::string, std::string>> db_table_names;
    {
        auto *new_txn = new_txn_mgr->BeginTxn(std::make_unique<std::string>("list table for compaction"), TransactionType::kRead);

        std::vector<std::string> db_names;
        Status status = new_txn->ListDatabase(db_names);
        if (!status.ok()) {
            UnrecoverableError(status.message());
        }
        for (const auto &db_name : db_names) {
            std::vector<std::string> table_names;
            status = new_txn->ListTable(db_name, table_names);
            if (!status.ok()) {
                UnrecoverableError(status.message());
            }
            for (const auto &table_name : table_names) {
                db_table_names.emplace_back(db_name, table_name);
            }
        }
        status = new_txn_mgr->CommitTxn(new_txn);
        if (!status.ok()) {
            UnrecoverableError(status.message());
        }
    }

    auto compact_table = [&](const std::string &db_name, const std::string &table_name, std::shared_ptr<BGTaskInfo> &bg_task_info) {
        auto new_txn_shared = new_txn_mgr->BeginTxnShared(std::make_unique<std::string>(fmt::format("compact table {}.{}", db_name, table_name)),
                                                          TransactionType::kCompact);
        LOG_INFO(fmt::format("Compact begin ts: {}", new_txn_shared->BeginTS()));
        Status status = Status::OK();
        DeferFn defer_fn([&] {
            if (status.ok()) {
                Status commit_status = new_txn_mgr->CommitTxn(new_txn_shared.get());
                if (!commit_status.ok()) {
                    LOG_ERROR(fmt::format("Commit compact table {}.{} failed: {}", db_name, table_name, commit_status.message()));
                }

                CompactTxnStore *compact_txn_store = static_cast<CompactTxnStore *>(new_txn_shared->GetTxnStore());
                if (compact_txn_store != nullptr) {
                    // Record compact info
                    std::string task_text = fmt::format("Txn: {}, commit: {}, compact table: {}.{} with segments: {} into {}",
                                                        new_txn_shared->TxnID(),
                                                        new_txn_shared->CommitTS(),
                                                        db_name,
                                                        table_name,
                                                        fmt::join(compact_txn_store->deprecated_segment_ids_, ","),
                                                        compact_txn_store->new_segment_id_);

                    bg_task_info->task_info_list_.emplace_back(task_text);
                    if (commit_status.ok()) {
                        bg_task_info->status_list_.emplace_back("OK");
                    } else {
                        bg_task_info->status_list_.emplace_back(commit_status.message());
                    }
                }
            } else {
                LOG_ERROR(fmt::format("Compact table {}.{} failed: {}", db_name, table_name, status.message()));
                Status rollback_status = new_txn_mgr->RollBackTxn(new_txn_shared.get());
                if (!rollback_status.ok()) {
                    UnrecoverableError(rollback_status.message());
                }
                std::string task_text = fmt::format("Compact table: {}.{}", db_name, table_name);
                bg_task_info->task_info_list_.emplace_back(task_text);
                bg_task_info->status_list_.emplace_back(status.message());
            }
        });

        std::shared_ptr<DBMeta> db_meta;
        std::shared_ptr<TableMeta> table_meta;
        TxnTimeStamp create_timestamp;
        status = new_txn_shared->GetTableMeta(db_name, table_name, db_meta, table_meta, create_timestamp);
        if (!status.ok()) {
            return;
        }
        std::vector<SegmentID> segment_ids;
        {
            std::vector<SegmentID> *segment_ids_ptr = nullptr;
            std::tie(segment_ids_ptr, status) = table_meta->GetSegmentIDs1();
            if (!status.ok()) {
                UnrecoverableError(status.message());
            }
            segment_ids = *segment_ids_ptr;
            if (segment_ids.empty()) {
                LOG_TRACE(fmt::format("No segment to compact for table: {}.{}", db_name, table_name));
                return;
            }
            SegmentID unsealed_id = 0;
            status = table_meta->GetUnsealedSegmentID(unsealed_id);
            if (!status.ok()) {
                if (status.code() == ErrorCode::kNotFound) {
                    status = Status::OK(); // Ignore the error.
                } else {
                    UnrecoverableError(status.message());
                }
            } else {
                segment_ids.erase(std::remove(segment_ids.begin(), segment_ids.end(), unsealed_id), segment_ids.end());
            }
        }

        if (segment_ids.empty()) {
            LOG_TRACE(fmt::format("No segment to compact for table: {}.{}", db_name, table_name));
            return;
        }

        std::vector<SegmentID> compactible_segment_ids = GetCompactableSegments(*table_meta, segment_ids);
        if (!compactible_segment_ids.empty()) {
            status = new_txn_shared->Compact(db_name, table_name, compactible_segment_ids);
        }
    };

    if (db_table_names.empty()) {
        LOG_TRACE("No table to compact.");
    } else {
        std::shared_ptr<BGTaskInfo> bg_task_info = std::make_shared<BGTaskInfo>(BGTaskType::kNotifyCompact);
        for (const auto &[db_name, table_name] : db_table_names) {
            compact_table(db_name, table_name, bg_task_info);
        }
        if (!bg_task_info->task_info_list_.empty()) {
            new_txn_mgr->AddTaskInfo(bg_task_info);
        }
    }
}

Status CompactionProcessor::NewManualCompact(NewTxn *new_txn, const std::string &db_name, const std::string &table_name) {
    auto *new_txn_mgr = InfinityContext::instance().storage()->new_txn_manager();
    new_txn_mgr->UpdateTxnBeginTSAndKVInstance(new_txn);

    std::unique_ptr<std::string> result_msg;
    LOG_TRACE(fmt::format("Compact command triggered compaction: {}.{}", db_name, table_name));
    std::shared_ptr<DBMeta> db_meta;
    std::shared_ptr<TableMeta> table_meta;
    TxnTimeStamp create_timestamp;
    Status status = new_txn->GetTableMeta(db_name, table_name, db_meta, table_meta, create_timestamp);
    if (!status.ok()) {
        return status;
    }
    std::vector<SegmentID> segment_ids;
    {
        std::vector<SegmentID> *segment_ids_ptr = nullptr;
        std::tie(segment_ids_ptr, status) = table_meta->GetSegmentIDs1();
        if (!status.ok()) {
            return status;
        }
        segment_ids = *segment_ids_ptr;
        SegmentID unsealed_id = 0;
        status = table_meta->GetUnsealedSegmentID(unsealed_id);
        if (!status.ok()) {
            if (status.code() != ErrorCode::kNotFound) {
                return status;
            }
        } else {
            segment_ids.erase(std::remove(segment_ids.begin(), segment_ids.end(), unsealed_id), segment_ids.end());
        }
    }

    std::vector<SegmentID> compactible_segment_ids = GetCompactableSegments(*table_meta, segment_ids);
    if (!compactible_segment_ids.empty()) {
        status = new_txn->Compact(db_name, table_name, compactible_segment_ids);
        if (!status.ok()) {
            return status;
        }
        result_msg = std::make_unique<std::string>(fmt::format("Compact segments {} into new segment", fmt::join(compactible_segment_ids, ",")));
    } else {
        result_msg = std::make_unique<std::string>("No segment to compact");
    }

    return Status(ErrorCode::kOk, std::move(result_msg));
}

void CompactionProcessor::Process() {
    bool running = true;
    while (running) {
        std::deque<std::shared_ptr<BGTask>> tasks;
        task_queue_.DequeueBulk(tasks);

        for (const auto &bg_task : tasks) {
            switch (bg_task->type_) {
                case BGTaskType::kStopProcessor: {
                    running = false;
                    break;
                }
                case BGTaskType::kManualCompact: {
                    // Triggered by compact command
                    StorageMode storage_mode = InfinityContext::instance().storage()->GetStorageMode();
                    if (storage_mode == StorageMode::kUnInitialized) {
                        UnrecoverableError("Uninitialized storage mode");
                    }
                    if (storage_mode == StorageMode::kWritable) {
                        LOG_DEBUG("Command compact start.");
                        auto *compact_task = static_cast<ManualCompactTask *>(bg_task.get());
                        compact_task->result_status_ = NewManualCompact(compact_task->new_txn_, compact_task->db_name_, compact_task->table_name_);
                        LOG_DEBUG("Command compact end.");
                    }
                    break;
                }
                case BGTaskType::kNotifyCompact: {
                    // Triggered by periodic thread
                    StorageMode storage_mode = InfinityContext::instance().storage()->GetStorageMode();
                    if (storage_mode == StorageMode::kUnInitialized) {
                        UnrecoverableError("Uninitialized storage mode");
                    }
                    if (storage_mode == StorageMode::kWritable) {
                        LOG_DEBUG("Periodic compact start.");
                        NewNotifyCompact();
                        LOG_DEBUG("Periodic compact end.");
                    }
                    break;
                }
                default: {
                    UnrecoverableError(fmt::format("Invalid background task: {}", (u8)bg_task->type_));
                    break;
                }
            }
            bg_task->Complete();
        }
        task_count_ -= tasks.size();
        tasks.clear();
    }
}

std::vector<SegmentID> CompactionProcessor::GetCompactableSegments(TableMeta &table_meta, const std::vector<SegmentID> &segment_ids) {
    auto compaction_alg = NewCompactionAlg::GetInstance();

    for (SegmentID segment_id : segment_ids) {
        SegmentMeta segment_meta(segment_id, table_meta);
        auto [segment_row_cnt, segment_status] = segment_meta.GetRowCnt1();
        if (!segment_status.ok()) {
            UnrecoverableError(segment_status.message());
        }
        compaction_alg->AddSegment(segment_id, segment_row_cnt);
    }

    return compaction_alg->GetCompactableSegments();
}

} // namespace infinity
